/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */
package org.unidata.mdm.dq.core.service.impl.instance;

import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.jgrapht.graph.DirectedMultigraph;
import org.jgrapht.traverse.TopologicalOrderIterator;
import org.unidata.mdm.core.type.data.ArrayAttribute;
import org.unidata.mdm.core.type.data.Attribute;
import org.unidata.mdm.core.type.data.SimpleAttribute;
import org.unidata.mdm.dq.core.context.CleanseFunctionContext;
import org.unidata.mdm.dq.core.dto.CleanseFunctionResult;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionInputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionOutputParam;
import org.unidata.mdm.dq.core.type.composite.CompositeFunctionNodeType;
import org.unidata.mdm.dq.core.type.measurement.DqMeasurementCategory;
import org.unidata.mdm.dq.core.type.model.instance.CompositeFunctionElement;
import org.unidata.mdm.dq.core.type.model.instance.DataQualityInstance;
import org.unidata.mdm.dq.core.type.model.source.CompositeCleanseFunctionNode;
import org.unidata.mdm.dq.core.type.model.source.CompositeCleanseFunctionSource;
import org.unidata.mdm.dq.core.type.model.source.CompositeCleanseFunctionTransition;
import org.unidata.mdm.dq.core.util.DQUtils;
import org.unidata.mdm.system.type.runtime.MeasurementPoint;

/**
 * @author Mikhail Mikhailov on Jan 28, 2021
 * Java function implementation.
 */
public class CompositeCleanseFunctionImpl
    extends AbstractCleanseFunctionImpl<CompositeCleanseFunctionSource>
    implements CompositeFunctionElement {
    /**
     * Function graph to traverse.
     */
    private final DirectedMultigraph<CompositeCleanseFunctionNode, CompositeCleanseFunctionTransition> graph;
    /**
     * Our instance.
     */
    private final DataQualityInstance instance;
    /**
     * Constructor.
     * @param instance the instance
     * @param acfs the function definition
     */
    public CompositeCleanseFunctionImpl(DataQualityInstance instance, CompositeCleanseFunctionSource ccfs) {
        super(ccfs);
        this.graph = DQUtils.ofSource(ccfs);
        this.instance = instance;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isCompositeFunction() {
        return true;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CompositeFunctionElement getCompositeFunction() {
        return this;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionResult execute(CleanseFunctionContext input) {

        MeasurementPoint.start(DqMeasurementCategory.COMPOSITE_CLEANSE_FUNCTION.name(), input.getFunctionName());
        try {

            CleanseFunctionResult output = new CleanseFunctionResult();

            TopologicalOrderIterator<CompositeCleanseFunctionNode, CompositeCleanseFunctionTransition> iterator = new TopologicalOrderIterator<>(graph);
            Map<CompositeCleanseFunctionTransition, CompositeFunctionParam> registers = new IdentityHashMap<>();
            while (iterator.hasNext()) {

                CompositeCleanseFunctionNode vertex = iterator.next();
                if (CompositeFunctionNodeType.INPUT_PORTS == vertex.getNodeType()) {
                    execInput(input, registers, vertex);
                } else if (CompositeFunctionNodeType.OUTPUT_PORTS == vertex.getNodeType()) {
                    execOutput(output, registers, vertex);
                } else if (CompositeFunctionNodeType.FUNCTION == vertex.getNodeType()) {
                    execFunction(input, output, registers, vertex);
                } else if (CompositeFunctionNodeType.CONSTANT == vertex.getNodeType()) {
                    execConstant(registers, vertex);
                } else if (CompositeFunctionNodeType.IF_THEN_ELSE == vertex.getNodeType()) {
                    execIfThenElse(registers, vertex);
                }
            }

            return output;
        } finally {
            MeasurementPoint.stop();
        }
    }
    /*
     * Executes input ports.
     */
    private void execInput(
            CleanseFunctionContext input,
            Map<CompositeCleanseFunctionTransition, CompositeFunctionParam> registers,
            CompositeCleanseFunctionNode vertex) {

        Set<CompositeCleanseFunctionTransition> outgoing = graph.outgoingEdgesOf(vertex);
        for (CompositeCleanseFunctionTransition transition : outgoing) {
            registers.put(transition, new CompositeFunctionParam(input.getInputParam(transition.getFromPort())));
        }
    }
    /*
     * Executes output ports.
     */
    private void execOutput(
            CleanseFunctionResult output,
            Map<CompositeCleanseFunctionTransition, CompositeFunctionParam> registers,
            CompositeCleanseFunctionNode vertex) {

        Set<CompositeCleanseFunctionTransition> incoming = graph.incomingEdgesOf(vertex);
        for (CompositeCleanseFunctionTransition transition : incoming) {

            CompositeFunctionParam cfp = registers.get(transition);
            if (cfp == null) {
                continue;
            }

            output.putOutputParam(cfp.toOutputParam(transition.getToPort()));
        }
    }
    /*
     * Sets constant values, converting them to attributes.
     */
    private void execConstant(
            Map<CompositeCleanseFunctionTransition, CompositeFunctionParam> registers,
            CompositeCleanseFunctionNode vertex) {

        Set<CompositeCleanseFunctionTransition> outgoing = graph.outgoingEdgesOf(vertex);
        for (CompositeCleanseFunctionTransition transition : outgoing) {
            CleanseFunctionInputParam param = DQUtils.ofConstant(vertex.getConstant(), transition.getToPort(), true);
            registers.put(transition, new CompositeFunctionParam(param));
        }
    }
    /*
     * Executes simple cleanse function.
     */
    private void execFunction(
            CleanseFunctionContext input,
            CleanseFunctionResult output,
            Map<CompositeCleanseFunctionTransition, CompositeFunctionParam> registers,
            CompositeCleanseFunctionNode vertex) {

        List<CleanseFunctionInputParam> params = new ArrayList<>();
        Set<CompositeCleanseFunctionTransition> incoming = graph.incomingEdgesOf(vertex);
        for (CompositeCleanseFunctionTransition transition : incoming) {

            CompositeFunctionParam cfp = registers.get(transition);
            if (cfp == null) {
                continue;
            }

            params.add(cfp.toInputParam(transition.getToPort()));
        }

        if (CollectionUtils.isNotEmpty(params)) {

            CleanseFunctionContext local = CleanseFunctionContext.builder(input)
                    .functionName(vertex.getFunctionName())
                    .input(params)
                    .build();

            CleanseFunctionResult intermediate = instance
                    .getFunction(vertex.getFunctionName())
                    .execute(local);

            // Copy intermediate to output.
            output.addSpots(intermediate.getSpots());
            output.addErrors(intermediate.getErrors());

            Set<CompositeCleanseFunctionTransition> outgoing = graph.outgoingEdgesOf(vertex);
            for (CompositeCleanseFunctionTransition transition : outgoing) {
                registers.put(transition, new CompositeFunctionParam(intermediate.getOutputParam(transition.getFromPort())));
            }
        }
    }
    /*
     * Executes IF-THEN-ELSE condition.
     */
    private void execIfThenElse(
            Map<CompositeCleanseFunctionTransition, CompositeFunctionParam> registers,
            CompositeCleanseFunctionNode vertex) {

        Set<CompositeCleanseFunctionTransition> incoming = graph.incomingEdgesOf(vertex);
        Set<CompositeCleanseFunctionTransition> outgoing = graph.outgoingEdgesOf(vertex);

        CompositeCleanseFunctionTransition condition = null;
        for (CompositeCleanseFunctionTransition t : incoming) {
            if (t.getToPortType() == CompositeFunctionNodeType.CONDITION) {
                condition = t;
                break;
            }
        }

        Objects.requireNonNull(condition, "Condition input must not be null.");

        CompositeFunctionParam value = registers.get(condition);
        Objects.requireNonNull(value, "Condition value must not evaluate to null.");

        boolean state = Boolean.TRUE.equals(value.extractSingletonValue());
        CompositeFunctionNodeType filter = state ? CompositeFunctionNodeType.OUTPUT_TRUE : CompositeFunctionNodeType.OUTPUT_FALSE;

        Map<String, CompositeCleanseFunctionTransition> input = incoming.stream()
                .filter(n -> n.getToPortType() == CompositeFunctionNodeType.INPUT)
                .collect(Collectors.toMap(CompositeCleanseFunctionTransition::getToPort, Function.identity()));

        Set<CompositeCleanseFunctionTransition> output = outgoing.stream()
                .filter(n -> n.getFromPortType() == filter)
                .collect(Collectors.toSet());

        for (CompositeCleanseFunctionTransition transition : output) {

            CompositeCleanseFunctionTransition in = input.get(transition.getFromPort());
            CompositeFunctionParam out = registers.get(in);

            Objects.requireNonNull(out, "Output param must not be null.");

            registers.put(transition, new CompositeFunctionParam(out.toOutputParam(transition.getFromPort())));
        }
    }
    /*
     * Sets the function implementation and calculates state.
     */
    public void implement() {
        source
            .withReady(true)
            .withSystem(false)
            .withConfigurable(true);
    }
    /**
     * @author Mikhail Mikhailov
     * Input/Output/Intermediate values holder.
     */
    private class CompositeFunctionParam {
        /**
         * The values hold.
         */
        private List<Attribute> attributes;
        /**
         * Constructor.
         */
        public CompositeFunctionParam(CleanseFunctionInputParam ip) {
            super();
            this.attributes = Objects.isNull(ip) || ip.isEmpty() ? Collections.emptyList() : ip.getAttributes();
        }
        /**
         * Constructor.
         */
        public CompositeFunctionParam(CleanseFunctionOutputParam op) {
            super();
            this.attributes = Objects.isNull(op) ? Collections.emptyList() : Collections.singletonList(op.getSingleton());
        }
        /**
         * Produces input param of the value hold.
         * @param portName the port name to use
         * @return input param
         */
        public CleanseFunctionInputParam toInputParam(String portName) {
            return CleanseFunctionInputParam.of(portName, attributes);
        }
        /**
         * Produces output param of the value hold.
         * @param portName the port name to use
         * @return output param
         */
        public CleanseFunctionOutputParam toOutputParam(String portName) {
            return CleanseFunctionOutputParam.of(portName, attributes.isEmpty() ? null : attributes.get(0));
        }
        /**
         * Tells whether this value is empty.
         * @return true, if this value is empty
         */
        public boolean isEmpty() {
            return CollectionUtils.isEmpty(attributes);
        }
        /**
         * Extracts single value.
         * @return single value
         */
        public Object extractSingletonValue() {

            Attribute attribute = isEmpty()
                    ? null
                    : attributes.get(0);

            if (Objects.isNull(attribute)) {
                return null;
            }

            switch (attribute.getAttributeType()) {
            case ARRAY:
                ArrayAttribute<?> array = (ArrayAttribute<?>) attribute;
                return array.isEmpty() ? null : array.getValues().get(0);
            case SIMPLE:
                SimpleAttribute<?> simple = (SimpleAttribute<?>) attribute;
                return simple.isEmpty() ? null : simple.getValue();
            default:
                break;
            }

            return null;
        }
    }
}
